/*
 * Copyright (c) 2022. China Mobile (SuZhou) Software Technology Co.,Ltd. All rights reserved.
 * Lakehouse is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

package com.chinamobile.cmss.lakehouse.api.scheduler;

import com.chinamobile.cmss.lakehouse.api.service.LakehouseInstanceService;
import com.chinamobile.cmss.lakehouse.common.dto.ClusterCreateStatus;
import com.chinamobile.cmss.lakehouse.common.dto.LakehouseCheckRequest;
import com.chinamobile.cmss.lakehouse.common.enums.ClusterStatusTypeEnum;
import com.chinamobile.cmss.lakehouse.common.enums.ClusterTypeEnum;
import com.chinamobile.cmss.lakehouse.dao.LakehouseClusterInfoDao;
import com.chinamobile.cmss.lakehouse.dao.entity.LakehouseClusterInfoEntity;
import com.chinamobile.cmss.lakehouse.service.resource.LakehouseResourceService;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import javax.annotation.Resource;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class LakehouseClusterMonitor {

    private static final String K8S = "K8s";
    private static final String ROUTER = "Router";
    private static final String HIVE = "Hive";
    private static final int DEPLOY_TIMEOUT_SECONDS = 300;
    private static List<String> CREATING = new ArrayList<>();

    static {
        CREATING.add(ClusterStatusTypeEnum.ACCEPTED.getStatus());
        CREATING.add(ClusterStatusTypeEnum.DEPLOYING.getStatus());
        CREATING.add(ClusterStatusTypeEnum.CONFIGURING.getStatus());
    }

    @Resource
    LakehouseClusterInfoDao lakehouseClusterInfoDao;

    @Autowired
    LakehouseInstanceService lakehouseInstanceService;

    @Autowired
    LakehouseResourceService lakehouseResourceService;

    public void monitorAndUpdateClusterStatus() {
        List<LakehouseClusterInfoEntity> acceptedClusters = lakehouseClusterInfoDao.findByStatusIn(CREATING);
        if (acceptedClusters == null || acceptedClusters.size() == 0) {
            log.warn("there is no new accepted clusters now!");
            return;
        }
        log.info("scan accepted clusters: size:{}", acceptedClusters.size());

        for (LakehouseClusterInfoEntity clusterInfoEntity : acceptedClusters) {
            Date createAt = clusterInfoEntity.getCreateTime();
            if (System.currentTimeMillis() - createAt.getTime() <= DEPLOY_TIMEOUT_SECONDS * 1000) {
                LakehouseCheckRequest request =
                    LakehouseCheckRequest.builder().instance(clusterInfoEntity.getInstance())
                        .engineType(clusterInfoEntity.getEngineType()).build();
                ClusterCreateStatus result = lakehouseResourceService.checkClusterConfigOk(request);
                log.info("cluster:{} stage:{}", clusterInfoEntity.getInstance(), result);
                if (result.isResult()) {
                    changeClusterStatus(clusterInfoEntity, ClusterStatusTypeEnum.RUNNING.getStatus());
                } else {
                    String key = result.getStage();
                    if (key.contains(K8S)) {
                        changeClusterStatus(clusterInfoEntity, ClusterStatusTypeEnum.DEPLOYING.getStatus());
                    }
                    if (key.contains(ROUTER) || key.contains(HIVE)) {
                        changeClusterStatus(clusterInfoEntity, ClusterStatusTypeEnum.CONFIGURING.getStatus());
                    }
                }
            } else {
                changeClusterStatus(clusterInfoEntity, ClusterStatusTypeEnum.CREATE_FAILED.getStatus());
            }
        }

    }

    private void changeClusterStatus(LakehouseClusterInfoEntity clusterInfoEntity, String newStatus) {
        String oldStatus = clusterInfoEntity.getStatus();
        if (!Objects.equals(oldStatus, newStatus)) {
            log.info("cluster:{} status update {} ------> {}", clusterInfoEntity.getInstance(), oldStatus, newStatus);
            clusterInfoEntity.setStatus(newStatus);
            lakehouseClusterInfoDao.save(clusterInfoEntity);
        }
    }

    public void monitorActiveClusters() {
        log.info("monitor active clusters.");
        List<LakehouseClusterInfoEntity> runningInstanceList = lakehouseClusterInfoDao.findByStatusAndClusterType(
            ClusterStatusTypeEnum.RUNNING.getStatus(), ClusterTypeEnum.LAKEHOUSE.toString());
        List<LakehouseClusterInfoEntity> errorInstanceList = lakehouseClusterInfoDao.findByStatusAndClusterType(
            ClusterStatusTypeEnum.ERROR.getStatus(), ClusterTypeEnum.LAKEHOUSE.toString());
        if (runningInstanceList.isEmpty() && errorInstanceList.isEmpty()) {
            return;
        }
        List<LakehouseClusterInfoEntity> totalInstanceList = new ArrayList<>();
        totalInstanceList.addAll(runningInstanceList);
        totalInstanceList.addAll(errorInstanceList);

        Map<String, Object> realStatus = lakehouseInstanceService.getInstanceListHealthStatus(totalInstanceList);
        if (realStatus.isEmpty()) {
            return;
        }
        List<LakehouseClusterInfoEntity> stopRunningInstanceList = new ArrayList<>();
        List<LakehouseClusterInfoEntity> stopErrorInstanceList = new ArrayList<>();
        // find stop running instance
        if (!runningInstanceList.isEmpty()) {
            for (LakehouseClusterInfoEntity clusterInfoEntity : runningInstanceList) {
                Boolean health = (Boolean) realStatus.get(clusterInfoEntity.getInstanceId());
                if (health != null && !health) {
                    clusterInfoEntity.setStatus(ClusterStatusTypeEnum.ERROR.getStatus());
                    stopRunningInstanceList.add(clusterInfoEntity);
                }
            }
        }
        // find stop error instance
        if (!errorInstanceList.isEmpty()) {
            for (LakehouseClusterInfoEntity clusterInfoEntity : errorInstanceList) {
                Boolean health = (Boolean) realStatus.get(clusterInfoEntity.getInstanceId());
                if (health != null && health) {
                    clusterInfoEntity.setStatus(ClusterStatusTypeEnum.RUNNING.getStatus());
                    stopErrorInstanceList.add(clusterInfoEntity);
                }
            }
        }

        // update in database
        if (!stopRunningInstanceList.isEmpty()) {
            lakehouseInstanceService.updateInstanceListStatus(ClusterStatusTypeEnum.ERROR.getStatus(),
                stopRunningInstanceList);
        }
        // update in database
        if (!stopErrorInstanceList.isEmpty()) {
            lakehouseInstanceService.updateInstanceListStatus(ClusterStatusTypeEnum.RUNNING.getStatus(),
                stopErrorInstanceList);
        }
    }

}
